package com.smartlazy.delay.queue.core.timer;

import com.smartlazy.delay.queue.core.bean.BucketItem;
import com.smartlazy.delay.queue.core.bean.JobItem;
import com.smartlazy.delay.queue.core.service.DelayBucket;
import com.smartlazy.delay.queue.core.service.DelayJob;
import com.smartlazy.delay.queue.core.service.ReadyQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Calendar;

/**
 *
 * @author howe
 */
@Slf4j
public class TickHandler implements Runnable {
    private String bucketName;

    @Autowired
    private DelayBucket delayBucket;

    @Autowired
    private DelayJob delayJob;

    @Autowired
    private ReadyQueue readyQueue;

    public TickHandler(String bucketName,
                       DelayBucket delayBucket,
                       DelayJob delayJob,
                       ReadyQueue readyQueue) {
        this.bucketName = bucketName;
        this.delayBucket = delayBucket;
        this.delayJob = delayJob;
        this.readyQueue = readyQueue;
    }

    @Override
    public void run() {
        log.info(bucketName + " is running");
        try {
            BucketItem item = delayBucket.getFromBucket(this.bucketName);
            if (item == null) {
                return;
            }
            long now = Calendar.getInstance().getTimeInMillis();

            // 延迟时间未到
            if (item.getTimestamp() > now) {
                return;
            }

            // 延迟时间小于等于当前时间, 取出Job元信息并放入ready queue
            JobItem jobItem = delayJob.getJob(item.getJobId());
            // job元信息不存在, 从bucket中删除
            if (jobItem == null) {
                log.warn("job元信息不存在, 从bucket中删除。bucketName:" + bucketName + " job:" + item.getJobId());
                delayBucket.removeFromBucket(bucketName, item.getJobId());
                return;
            }

            // 再次确认元信息中delay是否小于等于当前时间
            if (jobItem.getTimestamp() > now) {
                // 从bucket中删除旧的jobId
                delayBucket.removeFromBucket(bucketName, item.getJobId());
                // 重新计算delay时间并放入bucket中
                delayBucket.pushToBucket(bucketName, jobItem.getTimestamp(), item.getJobId());
                return;
            }
            long ret = readyQueue.pushToReadyQueue(jobItem.getTopic(), item.getJobId());
            if (ret == 0) {
                log.error("JobId放入ready queue失败。bucketName:" + bucketName + " job:" + jobItem.toString());
                return;
            }
            // 从bucket中删除
            delayBucket.removeFromBucket(bucketName, item.getJobId());
            log.info("JobId放入ready queue。bucketName:" + bucketName + " job:" + jobItem.toString());
        } catch (Exception e) {
            log.error("获取Job元信息失败 " + bucketName, e);
        }
    }
}
